fix(kafkajs): include kafka_cluster_id in DSM backlog offset tracking#7569
fix(kafkajs): include kafka_cluster_id in DSM backlog offset tracking#7569robcarlan-datadog wants to merge 6 commits intomasterfrom
Conversation
DSM checkpoints correctly included kafka_cluster_id in edge tags, but the backlog/offset tracking (which feeds lag metrics like data_streams.kafka.lag_messages and data_streams.kafka.lag_seconds) did not. This caused cross-cluster offset mixing when the same topic exists on multiple Kafka clusters, producing wildly incorrect lag values. Thread clusterId through to setOffset calls for both producer and consumer commit paths so that backlog entries are scoped per cluster. DSMON-1226 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Overall package sizeSelf size: 4.79 MB Dependency sizes| name | version | self size | total size | |------|---------|-----------|------------| | import-in-the-middle | 2.0.6 | 81.92 kB | 816.75 kB | | dc-polyfill | 0.1.10 | 26.73 kB | 26.73 kB |🤖 This report was automatically generated by heaviest-objects-in-the-universe |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #7569 +/- ##
==========================================
- Coverage 80.32% 80.29% -0.03%
==========================================
Files 733 733
Lines 31546 31570 +24
==========================================
+ Hits 25338 25349 +11
- Misses 6208 6221 +13 Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
BenchmarksBenchmark execution time: 2026-02-23 15:45:52 Comparing candidate commit a304de5 in PR branch Found 0 performance improvements and 0 performance regressions! Performance is the same for 225 metrics, 25 unstable metrics. |
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
✨ Fix all issues with BitsAI or with Cursor
|
|
|
||
| consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) { | ||
| const wrapConsume = (clusterId) => { | ||
| resolvedClusterId = clusterId |
There was a problem hiding this comment.
This relies on an assumption of an implementation detail to work, which is that COMMIT_OFFSETS will always happen in the context of a run, synchronously, and that no 2 runs can run concurrently. Have you validated that this assumption is correct? If yes, then I would add a comment clarifying that as it's critical for this to work properly and for future readers.
There was a problem hiding this comment.
That's a good point. I checked that's the case and added a comment
There should be only one run (here and the COMMIT_OFFSETS only happens in the offsetManager which is only used within the context of run.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…f github.com:DataDog/dd-trace-js into rob.carlan/DSMON-1226/kafkajs-dsm-backlog-cluster-id
fa7ed0e to
a304de5
Compare
Summary
kafka_cluster_idin edge tags, but the backlog/offset tracking (which feedsdata_streams.kafka.lag_messagesanddata_streams.kafka.lag_seconds) did not include itclusterIdthrough tosetOffset()calls for both producer and consumer commit paths so that backlog entries are properly scoped per clusterChanges
packages/datadog-instrumentations/src/kafkajs.js: Capture resolvedclusterIdin closure and include it in consumer COMMIT_OFFSETS event datapackages/datadog-plugin-kafkajs/src/producer.js: ExtractclusterIdfrom context and pass totransformProduceResponse, includekafka_cluster_idin backlogpackages/datadog-plugin-kafkajs/src/consumer.js: ExtractclusterIdfrom commit items and includekafka_cluster_idin backlogpackages/datadog-plugin-kafkajs/test/dsm.spec.js: Assertkafka_cluster_idis present in backlog entries when cluster ID is availableCustomer impact
Customers with the same topic on multiple Kafka clusters saw
data_streams.kafka.lag_messagesoscillate between cluster offsets (e.g., ~60k and ~84k), producing ~23k phantom lag messages. All cluster-scoped metrics (CloudWatch, DD Agent) showed 0-1 messages lag during the same window.Testing
Ran DSM without the fix and verified no kafka_cluster_id for the lag_messages and lag_seconds metric:

Ran DSM with the fix and verified the kafka_cluster_id appeared:

🤖 Generated with Claude Code